package com.coalmine.iec104.modules.handle;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.coalmine.iec104.config.constant.IecConfig;
import com.coalmine.iec104.modules.delayed.entity.ItemDelayed;
import com.coalmine.iec104.modules.domain.ReportDate;
import com.coalmine.iec104.modules.domain.UploadMsgInfo;
import com.coalmine.iec104.modules.service.ReportDataService;
import com.coalmine.iec104.modules.util.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.compress.utils.Lists;
import org.openmuc.j60870.*;
import org.openmuc.j60870.ie.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

/**
 * @Author mkq
 * @Date 2023-10-30 11:19
 * @Description
 */
@Slf4j
@Component
public class IEC104Handle {


    @Resource
    private IecConfig iecConfig;

    @Resource
    private DelayMsgImpl delayMsgImpl;

    @Resource
    private ReportDataService reportDataService;

    @Resource
    private RedisCache redisCache;

    @Value("${upload.msgSaveUrl}")
    private String msgSaveUrl;


    Connection clientConnection;

    volatile AtomicBoolean isClosed;

    @PostConstruct
    public void init() {
        isClosed = new AtomicBoolean(false);
        Executors.newSingleThreadExecutor().execute(this::iniIecConnect);
        // 定时生成 asdu报文
        buildAsduMsg();
        // 使用延时队列发送asdu报文
        sendMsg();
    }

    private void buildAsduMsg() {
        ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(1);
        threadPool.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    assembleMsg();
                } catch (Exception e) {
                    log.error("assembleMsg error" + e.getMessage(), e);
                }
            }
        }, 10, iecConfig.getUploadRate(), TimeUnit.SECONDS);
    }

    /**
     * 1.获取上报的点位
     * 2.生成asdu
     * 3.放进延时队列
     */
    private void assembleMsg() {
        if (StringUtils.isNull(clientConnection) || clientConnection.isClosed()) {
            return;
        }
        List<ReportDate> reports = reportDataService.getReportData().stream()
                .filter(u -> StringUtils.isNotBlank(u.getSystemCode()) && StringUtils.isNotBlank(u.getPointKey()))
                .collect(Collectors.toList());
        List<InformationObject> telemetryList = Lists.newArrayList();
        List<InformationObject> remoteSignalList = Lists.newArrayList();
        if (CollectionUtils.isNotEmpty(reports)) {
            Map<String, List<ReportDate>> points = reports.stream().collect(Collectors.groupingBy(ReportDate::getType));
            //遥测
            List<ReportDate> telemetryDates = points.get("1");
            if (CollectionUtils.isNotEmpty(telemetryDates)) {
                for (int i = 0; i < telemetryDates.size(); i++) {
                    ReportDate reportDate = telemetryDates.get(i);
                    Object real = getRealValue(reportDate);
                    if (StringUtils.isNotNull(real)) {
                        try {
                            InformationObject info = new InformationObject(Integer.parseInt(reportDate.getPointCode()),
                                    new InformationElement[][]{{new IeShortFloat(Float.parseFloat(real.toString())), new IeQuality(false, false, false, false, false)}});
                            telemetryList.add(info);
                        } catch (Exception e) {
                            log.error("新增遥测上报点位详情异常" + reportDate.toString(), e);
                        }
                    }
                }
            }
            // 遥信
            List<ReportDate> remoteSignalDates = points.get("2");
            if (CollectionUtils.isNotEmpty(remoteSignalDates)) {
                for (int i = 0; i < remoteSignalDates.size(); i++) {
                    ReportDate reportDate = remoteSignalDates.get(i);
                    Object real = getRealValue(reportDate);
                    if (StringUtils.isNotNull(real)) {
                        try {
                            InformationObject info = new InformationObject(Integer.parseInt(reportDate.getPointCode()),
                                    new InformationElement[][]{{new IeSinglePointWithQuality((Boolean) real, false, false, false, false)}});
                            remoteSignalList.add(info);
                        } catch (Exception e) {
                            log.error("新增遥信上报点位详情异常" + reportDate.toString(), e);
                        }
                    }
                }
            }
        }
        if (CollectionUtils.isNotEmpty(telemetryList)) {
            InformationObject[] informationObjects = telemetryList.toArray(new InformationObject[telemetryList.size()]);
            // 遥测
            ASdu telemetry = new ASdu(ASduType.M_ME_NC_1,
                    false,
                    CauseOfTransmission.SPONTANEOUS,
                    false,
                    false,
                    0, iecConfig.getCommonAddress(), informationObjects);
            delayMsgImpl.addToDelayQueue(new UploadMsgInfo("1", telemetry));
        }
        if (CollectionUtils.isNotEmpty(remoteSignalList)) {
            InformationObject[] informationObjects = remoteSignalList.toArray(new InformationObject[remoteSignalList.size()]);
            ASdu remoteSignal = new ASdu(ASduType.M_SP_NA_1,
                    false,
                    CauseOfTransmission.SPONTANEOUS,
                    false,
                    false,
                    0, iecConfig.getCommonAddress(), informationObjects);
            ItemDelayed<UploadMsgInfo> delayedItem = new ItemDelayed<>(System.currentTimeMillis(), 4);
            delayedItem.setData(new UploadMsgInfo("2", remoteSignal));
            delayMsgImpl.addToDelayQueue(delayedItem);
        }
    }

    private Object getRealValue(ReportDate reportDate) {
        Object result = null;
        JSONObject j = redisCache.getCacheObject("UPLOAD:" + reportDate.getSystemCode());
        if (StringUtils.isNotNull(j)) {
            List<JSONObject> cacheList = j.getObject("data", new TypeReference<List<JSONObject>>() {
            });
            if (CollectionUtils.isNotEmpty(cacheList)) {
                JSONObject jsonObject = cacheList.stream().filter(u -> StringUtils.equals(u.getString("id"), reportDate.getPointKey())).findFirst().orElse(null);
                if (StringUtils.isNotNull(jsonObject) && StringUtils.isNotNull(jsonObject.get("v"))) {
                    result = jsonObject.get("v");
                }
            }
        }
        return result;
    }

    public static boolean isNumeric(String s) {
        return s.matches("-?[0-9]+.?[0-9]*");
    }

    private void sendMsg() {
        ExecutorService delayOrderExecutor = Executors.newSingleThreadExecutor();
        delayOrderExecutor.execute(() -> {
            ItemDelayed<UploadMsgInfo> sendCommandItemDelayed;
            while (true) {
                try {
                    sendCommandItemDelayed = delayMsgImpl.getAsduQueue().take();
                    if (StringUtils.isNotNull(clientConnection) && !clientConnection.isClosed()) {
                        ASdu data = sendCommandItemDelayed.getData().getAsdu();
                        String msgType = sendCommandItemDelayed.getData().getMsgType();
                        String msg = clientConnection.send(data);
                        //调试阶段先打印日志
                        log.info("给省局发送报文信息:" + data.toString());
                        ThreadPoolUtil.afterUploadService.execute(() -> {
                            saveUploadData(msg, data.toString(), JSONObject.toJSONString(data), msgType);
                        });
                    }
                } catch (Exception e) {
                    log.error("发送iec报文_延迟队列_异常:" + e);
                }
            }
        });
    }

    private void saveUploadData(String msg, String msgContent, String msgJson, String msgType) {
        HashMap<String, Object> params = new HashMap<>();
        params.put("id", IdUtils.simpleUUID());
        params.put("content", msg);
        params.put("msgContent", msgContent);
        params.put("msgInfo", msgJson);
        params.put("msgType", msgType);
        params.put("uploadTime", DateUtils.getTime());
        UploadUtil.saveUploadData(msgSaveUrl, params);
    }

    /**
     * 初始化客户端连接并重连
     */
    private void iniIecConnect() {
        while (true) {
            if (!isClosed.get()) {
                try {
                    destroy();
                    ClientConnectionBuilder clientConnectionBuilder = new ClientConnectionBuilder(iecConfig.getUrl());
                    if (iecConfig.getPort() != null) {
                        clientConnectionBuilder.setPort(iecConfig.getPort());
                    }
                    clientConnectionBuilder.setMaxTimeNoAckReceived(254000);
                    clientConnection = clientConnectionBuilder.build();
                    clientConnection.setConnectionListener(new ConnectionListenerThreadImpl());
                    // 暂不发链路启动帧
                    //clientConnection.startDataTransfer(new ConnectionListenerThreadImpl());
                    isClosed.set(true);
                    log.info("iec104 server 连接成功");
                } catch (Exception e) {
                    destroy();
                    log.error("iec104 创建连接失败" + e.getMessage(), e);
                    //10 秒重试连接
                    threadSleep(10000);
                }
            }
        }
    }


    private void threadSleep(Integer sleepTime) {
        try {
            Thread.sleep(sleepTime);
        } catch (InterruptedException interruptedException) {

        }
    }


    private class ConnectionListenerThreadImpl implements ConnectionEventListener {

        @Override
        public void newASdu(ASdu aSdu) {
            log.info("接收到 asdu 消息" + aSdu.toString(), aSdu);
            try {
                if (aSdu.getTypeIdentification() == ASduType.C_IC_NA_1) {
                    clientConnection.sendConfirmation(aSdu, iecConfig.getCommonAddress());
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void connectionClosed(IOException e) {
            log.info("iec104 server 连接断开" + e.getMessage(), e);
            isClosed.set(false);
        }

        @Override
        public void dataTransferStateChanged(boolean b) {

        }

    }

    @PreDestroy
    private void destroy() {
        if (StringUtils.isNotNull(clientConnection) && !clientConnection.isClosed()) {
            try {
                clientConnection.stopDataTransfer();
            } catch (IOException ioException) {

            }
            clientConnection.close();
        }
    }

}
